查看原文
其他

实时数仓之流式ETL实践

林小铂 大数据范式 2022-11-29


公众号推文规则变了,点击上方 "蓝色", 设为星



摘要:网易游戏资深开发工程师林小铂为大家带来网易游戏基于 Flink 的流式  ETL 建设的介绍。内容包括:
  1. 业务背景

  2. 专用 ETL

  3. EntryX 通用 ETL

  4. 调优实践

  5. 未来规划

 

一. 业务背景


网易游戏 ETL 服务概况


网易游戏的基础数据主要日志方式采集,这些日志通常是非结构化或半结构化数据,需要经过数据集成 ETL 才可以入库至实时或离线的数据仓库。此后,业务用户才可以方便地用 SQL 完成大部分数据计算,包括实时的 Flink SQL 和离线的 Hive 或 Spark。


网易游戏数据集成的数据流与大多数公司大同小异,主要有游戏客户端日志、游戏服务端日志和其他周边基础的日志,比如 Nginx access log、数据库日志等等。这些日志会被采集到统一的 Kafka 数据管道,然后经由 ETL 入库服务写入到 Hive 离线数据仓库或者 Kafka 实时数据仓库。
这是很常见的架构,但在我们在需求方面是有一些比较特殊的情况。


网易游戏流式 ETL 需求特点



首先,不同于互联网、金融等行业基本常用 MySQL、Postgres 等的关系型数据库,游戏行业常常使用 MongoDB 这类 schema-free 的文档型数据库。这给我们 ETL 服务带来的问题是并没有一个线上业务的准确的 schema 可以依赖,在实际数据处理中,多字段或少字段,甚至一个字段因为玩法迭代变更为完全不同的格式,这样的情况都是可能发生的。这样的数据异构问题给我们 ETL 的数据清洗带来了比较高的成本。
其次,也是由于数据库选型的原因,大部分业务的数据库模式都遵循了反范式设计,会刻意以复杂内嵌的字段来避免表间的 join。这种情况给我们带来的一个好处是,在数据集成阶段我们不需要去实时地去 join 多个数据流,坏处则是数据结构可能会非常复杂,多层嵌套十分常见。
然后,由于近年来实时数仓的流行,我们也同样在逐步建设实时数据仓库,所以复用现有的 ETL 管道,提取转换一次,加载到实时离线两个数据仓库,成为一个很自然的发展方向。
最后,我们的日志类型多且变更频繁,比如一个玩法复杂的游戏,可能有 1,000 个以上的日志类型,每两周可能就会有一次发版。在这样的背景下 ETL 出现异常数据是不可避免的。因此我们需要提供完善的异常处理,让业务可以及时得知数据异常和通过流程修复数据。


日志分类及特点




为了更好地针对不同业务使用模式优化,我们对不同日志类型的业务提供了不同的服务。我们的日志通常分为三个类型:运营日志、业务日志和程序日志。
运营日志记录的是玩家行为事件,比如登录帐号、领取礼包等。这类日志是最为重要日志,有固定的格式,也就是特定 header + json 的文本格式。数据的主要用途是做数据报表、数据分析还有游戏内的推荐,比如玩家的组队匹配推荐。
业务日志记录的是玩家行为以外的业务事件,这个就比较广泛,比如 Nginx access log、CDN 下载日志等等,这些完全没有固定格式,可能是二进制也可能是文本。主要用途类似于运营日志,但更加丰富和定制化。
程序日志记录是程序的运行情况,也就是平时我们通过日志框架打的 INFO、ERROR 这类日志。程序日志主要用途是检索定位运行问题,通常是写入 ES,但有时数量过大或者需要提取指标分析时,也会写入数据仓库。


网易游戏 ETL 服务剖析




针对这些日志分类,我们具体提供了三类 ETL 入库的服务。首先是运营日志专用的 ETL,这会根据运营日志的模式进行定制化。然后是通用的面向文本日志的 EntryX ETL 服务,它会服务于运营日志以外的所有日志。最后是 EntryX 无法支持的特殊 ETL 需求,比如有加密或者需要进行特殊转换的数据,这种情况下我们就会针对性地开发 ad-hoc 作业来处理。

二. 运营日志专用 ETL


运营日志 ETL 发展历程



运营日志 ETL 服务有着一个比较久的历史。大概在 2013 年,网易游戏就建立了基于 Hadoop Streaming + Python 预处理/后处理的第一版离线 ETL 框架。这套框架是平稳运行了多年。
在 2017 年的时候,随着 Spark Streaming 的崭露头角,我们开发了基于 Spark Streaming 的第二个版本,相当于一个 POC,但因为微批调优困难且小文件多等问题没有上线应用。
时间来到 2018 年,当时 Flink 已经比较成熟,我们也决定将业务迁移到 Flink 上,所以我们很自然地开发了基于 Flink DataStream 的第三版运营日志 ETL 服务。这里面比较特殊的一点就是,因为长久以来我们业务方积累了很多 Python 的 ETL 脚本,然后新版最重要的一点就是要支持这些 Python UDF 的无缝迁移。


运营日志 ETL 架构

接下来看下两个版本的架构对比。


在早期 Hadoop Streaming 的版本里面,数据首先会被 dump 到 HDFS 上,然后 Hadoop Streaming 启动 Mapper 来读取数据并通过标准输入的方式传递给 Python 脚本。Python 脚本里面会分为三个模块:首先预处理 UDF,这里通常会进行基于字符串的替换,一般用作规范化数据,比如有些海外合作厂商的时间格式可能跟我们不同,那么就可以在这里进行统一。预处理完的数据会进入通用的解析/转换模块,这里我们会根据运营日志的格式来解析数据,并进行通用转换,比如滤掉测试服数据。通用模块之后,最后还有一个后处理模块进行针对字段的转换,比如常见的汇率转换。之后数据会通过标准输出返回给 Mapper,然后 Mapper 再将数据批量写到 Hive 目录中。
我们用 Flink 重构后,数据源就由 HDFS 改为直接对接 Kafka,而 IO 模块则用 Flink 的 Source/Sink Operator 来代替原本的 Mapper,然后中间通用模块可以直接重写为 Java,剩余的预处理和后处理则是我们需要支持 Python UDF 的地方。


Python UDF 实现



在具体实现上,我们在 Flink ProcessFunction 之上加入了 Runner 层,Runner 层负责跨语言的执行。技术选型上是选了 Jython,而没有选择 Py4j,主要因为 Jython 可以直接在 JVM 里面去完成计算,不需要额外启动 Python 进程,这样开发和运维管理成本都比较低。而 Jython 带来的限制,比如不支持 pandas 等基于 c 的库,这些对于我们的 Python UDF 来说都是可接受的。
整个调用链是,ProcessFunction 在 TaskManager 被调用时会在 open 函数延迟初始化 Runner,这是因为 Jython 是不可序列化的。Runner 初始化时会负责资源准备,包括将依赖的模块加入 PYTHONPATH,然后根据配置反射调用 UDF 函数。
调用时,对于预处理 UDF Runner 会把字符串转化为 Jython 的 PyUnicode 类型,而对于后处理 UDF 则会把解析后的 Map 对象转为 Jython 的 PyDcitionary,分别作为两者的输入。UDF 可以调用其他模块进行计算,最终返回 PyObject,然后 Runner 再将其转换成 Java String 或者 Map,返回给 ProcessFunction 输出。


运营日志 ETL 运行时



刚刚是 UDF 模块的局部视图,我们再来看下整体的 ETL 作业视图。首先在我们提供了通用的 Flink jar,当我们生成并提交 ETL 作业到作业平台时,调度器会执行通用的 main 函数构建 Flink JobGraph。这时会从我们的配置中心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中包含使用到的 Python 模块,后端服务会扫描其中引用到的其他模块,把它们统一作为资源文件通过 YARN 分发功能上传到 HDFS 上。在 Flink JobManager 和 TaskManager 启动时,这些 Python 资源会被 YARN 自动同步到工作目录上备用。这就是整个作业初始化的过程。
然后因为 ETL 规则的小变更是很频繁的,比如新增一个字段或者变更一下过滤条件,如果我们每次变更都需要重启作业,那么作业重启带来的不可用时间会对我们的下游用户造成比较糟糕的体验。因此,我们对变更进行了分类,对于一些不影响 Flink JobGraph 的轻量级变更支持热更新。实现的方式是每个 TaskManager 启动一个热更新线程,定时轮询配置中心同步配置。



三. EntryX 通用 ETL


接下来介绍我们的通用 ETL 服务 EntryX。这里的通用可以分为两层意义,首先是数据格式上的通用,支持非结构化到结构化的各种文本数据,其次是用户群体的通用,目标用户覆盖数据分析、数据开发等传统用户,和业务程序、策划这些数据背景较弱的用户。


EntryX 基本概念



先介绍 EntryX 的三个基本概念,Source、StreamingTable 和 Sink。用户需要分别配置这个三个模块,系统会根据这些自动生成 ETL 作业。


Source 是 ETL 作业的输入源,通常是从业务端采集而来的原始日志 topic,或者是经过分发过滤后的 topic。这些 topic 可能只包含一种日志,但更多情况下会包含多种异构日志。
接下来 StreamingTable,一个比较通俗的名称就是流表。流表定义了 ETL 管道的主要元数据,包括如何转换数据,还有根据转换好的数据定义的流表 schema,将数据 schema 化。流表 schema 是最为关键的概念,它相当于 Table DDL,主要包括字段名、字段数据类型、字段约束和表属性等。为了更方便对接上下游,流表 schema 使用的是自研的 SQL-Like 的类型系统,里面会支持我们一些拓展的数据类型,比如 JSON 类型。
最后 Sink 负责流表到目标存储的物理表的映射,比如映射到目标 Hive 表。这里主要需要 schema 的映射关系,比如流表哪个字段映射到目标表哪个字段,流表哪个字段用作目标 Hive 表分区字段。在底层,系统会自动根据 schema 映射关系来提取字段,并将数据转换为目标表的存储格式,加载到目标表。


EntryX ETL 管道



再来看下 EntryX ETL 管道的具体实现。蓝色部分是外部存储系统,而绿色部分则是 EnrtyX 的内部模块。
数据首先从对接采集的原始数据 Topic 流入,经过 Source 摄入到 Filter。Filter 负责根据关键词过滤数据,通常来说我们要求过滤完的数据是有相同 schema 的。经过这两步数据完成 Extract,来到 Transform 阶段。
Transform 第一步是解析数据,也就是这里的 Parser。Parser 支持 JSON/Regex/Csv 三种解析,基本可以覆盖所有案例。第二步是对数据进行转换,这是由 Extender 负责的。Extender 通过内置函数或 UDF 计算衍生字段,最常见的是将 JSON 对象拉平展开,提取出内嵌字段。最后是 Formatter,Formatter 会根据之前用户定义的字段逻辑类型,将字段的值转为对应的物理类型。比如一个逻辑类型为 BIGINT 的字段,我们在这里会统一转为 Java long 的物理类型。
数据完成 Transform 之后来到最后的 Load 阶段。Load 第一步是决定数据应该加载到哪个表。Splitter 模块会根据每个表的入库条件(也就是一个表达式)来分流数据,然后再到第二步的 Loader 来负责将数据写到具体的外部存储系统。目前我们支持 Hive/Kafka 两种存储,Hive 支持 Text/Parquet/JSON 三种格式,而 Kafka 支持 JSON 和 Avro 两种格式。


实时离线统一 Schema



在 Entryx 的设计里数据可以被写入实时和离线两个数据仓库,也就是说同一份数据,但在不同的存储系统中以不同格式表示。从 Flink SQL 的角度来说是 schema 部分相同,但 connector 和 format 不同的两个表。而 schema 部分经常会随业务变更,而 connector 和 format(也就是存储系统和存储格式)是相对稳定的。那么一个很自然的想法就是,能不能将 schema 部分提取出来独立维护?实际上,这个抽象的 schema 已经存在了,就是我们在 ETL 提取的流表 schema。
在 EntryX 里面,流表 schema 是与序列化器、存储系统无关的 schema,作为 Single Source of Truth。基于流表 schema,加上存储系统信息和存储格式信息,我们就可以衍生出具体的物理表的 DDL。目前我们主要是支持 Hive/Kafka,如果之后要拓展至支持 ES/HBase 表也是非常方便。


实时数据仓库集成



EntryX 一个重要的定位是作为实时仓库的统一入口。刚刚其实已经多次提到 Kafka 表,但还没有说实时数仓是怎么做的。实时数仓的常见问题是 Kafka 并没有原生支持 schema 元数据的持久化。目前社区的主流解决方案是基于 Hive MetaStore 来保存 Kafka 表的元数据,并复用 HiveCatalog 来直接对接到 Flink SQL。



但这对于我们来说使用 Hive MetaStore 主要有几个问题:一是在实时作业里引入 Hive 依赖并与 Hive 耦合,这是很重的依赖,导致定义的表很难被其他组件复用,包括 Flink DataStream 用户;二是我们已经有 Kafka SaaS 平台 Avatar 来管理物理 schema,比如 Avro schema,如果再引入 Hive MetaStore 会导致元数据的割裂。因此,我们是拓展了 Avatar 平台的 schema 注册中心,同时支持逻辑 schema 和物理 schema。
那么实时数仓和 EntryX 的集成关系是:首先我们有 EntryX 的流表 schema,在新建 Sink 的时候调用 Avatar 的 schema 接口,根据映射关系生成逻辑 schema,而 Avatar 再根据 Flink SQL 类型与物理类型的映射关系生成 topic 的物理 schema。
与 Avatar schema 注册中心配套的还有我们自研的 KafkaCatalog,它负责读取 topic 的逻辑和物理 schema 来生成 Flink SQL 的 TableSource 或 TableSink。而对于一些 Flink SQL 以外的用户,比如 Flink DataStream API 的用户,他们也可以直接读取物理 schema 来享受到数据仓库的便利。


EntryX 运行时


和运营日志 ETL 类似,在 EntryX 运行时,系统会基于通用的 jar 和配置生成 Flink 作业,但这里有两种情况需要特别处理。




首先是一个 Kafka topic 往往有几十甚至上千种日志,那么对应其实有也几十甚至上千的流表,如果每个流表都单独运行在一个作业里,那么一个 topic 会可能会被读上千遍,这是非常大的浪费。因此,在作业运行时提供一个优化策略,可以将同个 source 的不同流表合并到一个作业里跑。比如图中,某个手游上传了 3 种日志到 Kafka,用户分别配置了玩家注册、玩家登录、领取礼包三个流表,那么我们可以这三个流表合并起来到一个作业,共享同一个 Kafka Source。
另外的一个优化是,一般情况下我们可以按照之前“提取转换一次,加载一次”的思路来将数据同时写到 Hive 和 Kafka,但是由于 Hive 或者说 HDFS 毕竟是离线系统,实时性比较差,写入在一些负载比较高的 HDFS 老集群经常会出现反压,同时阻塞上游,导致 Kafka 的写入也受到影响。在这种情况下,我们通常要分离加载到实时和离线的 ETL 管道,具体会取决于业务的 SLA 还有 HDFS 的性能。


四.调优实践


接下来给大家分享下我们在 ETL 建设中的调优实践经验。


HDFS 写入调优




首先是 HDFS 写入的调优。流式写入 HDFS 场景中老生常谈的一个问题便是小文件过多。通常来说小文件和实时性是鱼与熊掌不可兼得。如果要延迟低,那么我们需要频繁地滚动文件来提交数据,必然导致小文件过多。
小文件过多主要造成两个问题:一从 HDFS 集群管理角度看,小文件会占用大量的文件数和 block 数,浪费 NameNode 内存;二是从用户角度看,读写效率都会降低,因为写的时候要更频繁地调用 RPC 和 flush 数据,造成更多的阻塞,有时甚至造成 checkpoint 超时,而读时则需要打开更多的文件才能读完数据。


HDFS 写入调优 - 数据流预分区



我们在优化小文件问题时做的一点调优是对数据流先做一遍预分区,具体来说,便是在 Flink 作业内部先基于目标 Hive 表进行一次 keyby 分区,让同一个表的数据尽量集中在少数的几个 subtask 上。




举个例子,假设 Flink 作业并行度为 n,而目标 Hive 分区数为 m 个。因为每个 subtask 都有可能读到任意分区的数据,在默认的各 subtask 完全并行的情况下,每个 subtask 都会写所有分区,造成总体的写入文件数是 n * m。假设 n 是 100,m 是 1000,按 10 分钟滚一次文件算,每天会造成 14,400,000 个文件,这对于很多老集群来说是非常大的压力。
如果经过数据流分区的优化之后,我们就可以限制住 Flink 并行度带来的增长。比如我们 keyby hive 表字段,并加入范围为 0-s 整数的盐来避免数据倾斜,那么分区最多会被 s 个 subtask 读写。假设 s 是 5,比起原先 n 是 100,那么我们就将原本的文件数降低为原来 20 分之一。


基于 OperatorState 的 SLA 统计



第二个我想分享的是我们的 SLA 统计工具。背景是我们的用户经常会通过 Web UI 来进行调试和问题的排查,比如不同 subtask 的输入输出数目,但这些 metric 会因为作业重启或者 failover 而重置,因此我们开发了基于 OperatorState 的 SLA-Utils 工具来统计数据的输入和分类输出。这个工具设计得非常轻量级,可以很容易集成到我们自己的服务或者用户的作业里面。



在 SLA-Utils 里面,我们支持了三种 metric。首先是标准的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,分别对应输入记录数/正常输出记录数/被过滤掉的记录数/处理异常的记录数。通常来说 recordsIn 就等于后面三者的总和。第二种用户可以自定义的 metric,通常可以用于记录更详细的原因,比如是 recordsEventTimeDropped 代表数据是因为 event time 被过滤的。
那么上述两种 metric 静态的,也就是说 metric key 在作业运行前就要确定,此外 SLA-Utils 还支持在运行时动态注册的 TTL metric。这种 metric 通常有动态生成的日期作为前缀,在经过 TTL 的时间之后被自动清理。TTL metric 主要可以用于做天级别时间窗口的统计。这里比较特别的一点是,因为 OperatorState 是不支持 TTL 的,SLA-Utils 是在每次进行 checkpoint 快照的时候进行一次过滤,剔除掉过期的 metric,以实现 TTL 的效果。
那么在 State 保存了 SLA 指标之后要做的就是暴露给用户。我们目前的做法是通过 Accumulater 的方式来暴露,优点是 Web UI 有支持,开箱即用,同时 Flink 可以自动合并不同的 subtask 的 metric。缺点在于没有办法利用 metric reporter 来 push 到监控系统,同时因为 Acuumulater 是不能在运行时动态注销的,所以使用 TTL metric 会有内存泄漏的风险。因此,在未来我们也考虑支持 metric group 来避免这些问题。


数据容错及恢复



最后再分享下我们在数据容错和恢复上的实践。




以很多最佳实践相似,我们用 SideOutput 来收集 ETL 各环节中出错的数据,汇总到一个统一的错误流。错误记录中包含我们预设的错误码、原始输入数据以及错误类和错误信息。一般情况下,错误数据会被分类写入 HDFS,用户通过监控 HDFS 目录可以得知数据是否正常。




那么存储好异常数据后,下一步就是要恢复数据。这通常有两种情况。
一是数据格式异常,比如日志被截断导致不完整或者时间戳不符合约定格式,这种情况下我们一般通过离线批作业来修复数据,重新回填到原有的数据管道。
二是 ETL 管道异常,比如数据实际的 schema 有变更但流表配置没有更新,可能会导致某个字段都是空值,这时我们的处理办法是:首先更新线上的流表配置为最新,保证不再产生更多异常数据,这时 Hive 里面仍有部分分区是异常的。然后,我们发布一个独立的补数作业来专门修复异常的数据,输出的数据会写到一个临时的目录,并在 hive metastore 上切换 partition 分区的 location 来替换掉原来的异常目录。因此这样的一个补数流程对离线查询的用户来说是透明的。最后我们再在合适的时间替换掉异常分区的数据并恢复 location。



五.未来规划


最后介绍下我们的未来规划。





  • 第一个是数据湖的支持。目前我们的日志绝大多数都是 append 类型,不过随着 CDC 和 Flink SQL 业务的完善,我们可能会有更多的 update、delete 的需求,因此数据湖是一个很好的选择。

  • 第二个会提供更加丰富的附加功能,比如实时的数据去重和小文件的自动合并。这两个都是对业务方非常实用的功能。

  • 最后是一个支持 PyFlink。目前我们的 Python 支持只覆盖到数据集成阶段,后续数据仓库的 Python 支持我们是希望通过 PyFlink 来实现。



end


1.人-货-场 指标体系建设 之 SQL案例分析
2.数据指标体系建设方法
3.基于Flink商品实时推荐系统(附源码)

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存